use std::{ffi::OsString, future::Future, sync::Arc, time::Duration};

use anyhow::Result;
use futures::{FutureExt, future::BoxFuture};
use parking_lot::Mutex;
use tokio::{fs, select, sync::mpsc::{self, UnboundedReceiver}, task::JoinHandle};
use yazi_config::{TASKS, plugin::{Fetcher, Preloader}};
use yazi_dds::Pump;
use yazi_fs::{must_be_dir, remove_dir_clean, unique_name};
use yazi_proxy::{ManagerProxy, options::{PluginOpt, ProcessExecOpt}};
use yazi_shared::{Throttle, url::Url};

use super::{Ongoing, TaskProg, TaskStage};
use crate::{HIGH, LOW, NORMAL, TaskKind, TaskOp, file::{File, FileOpDelete, FileOpHardlink, FileOpLink, FileOpPaste, FileOpTrash}, plugin::{Plugin, PluginOpEntry}, prework::{Prework, PreworkOpFetch, PreworkOpLoad, PreworkOpSize}, process::{Process, ProcessOpBg, ProcessOpBlock, ProcessOpOrphan}};

pub struct Scheduler {
	pub file:    Arc<File>,
	pub plugin:  Arc<Plugin>,
	pub prework: Arc<Prework>,
	pub process: Arc<Process>,

	micro:       async_priority_channel::Sender<BoxFuture<'static, ()>, u8>,
	prog:        mpsc::UnboundedSender<TaskProg>,
	handles:     Vec<JoinHandle<()>>,
	pub ongoing: Arc<Mutex<Ongoing>>,
}

impl Scheduler {
	pub fn serve() -> Self {
		let (micro_tx, micro_rx) = async_priority_channel::unbounded();
		let (macro_tx, macro_rx) = async_priority_channel::unbounded();
		let (prog_tx, prog_rx) = mpsc::unbounded_channel();

		let mut scheduler = Self {
			file:    Arc::new(File::new(macro_tx.clone(), prog_tx.clone())),
			plugin:  Arc::new(Plugin::new(macro_tx.clone(), prog_tx.clone())),
			prework: Arc::new(Prework::new(macro_tx.clone(), prog_tx.clone())),
			process: Arc::new(Process::new(prog_tx.clone())),

			micro:   micro_tx,
			prog:    prog_tx,
			handles: Vec::with_capacity(TASKS.micro_workers as usize + TASKS.macro_workers as usize + 1),
			ongoing: Default::default(),
		};

		for _ in 0..TASKS.micro_workers {
			scheduler.handles.push(scheduler.schedule_micro(micro_rx.clone()));
		}
		for _ in 0..TASKS.macro_workers {
			scheduler.handles.push(scheduler.schedule_macro(micro_rx.clone(), macro_rx.clone()));
		}
		scheduler.progress(prog_rx);
		scheduler
	}

	pub fn cancel(&self, id: usize) -> bool {
		let mut ongoing = self.ongoing.lock();

		if let Some(hook) = ongoing.hooks.remove(&id) {
			self.micro.try_send(hook(true), HIGH).ok();
			return false;
		}

		ongoing.all.remove(&id).is_some()
	}

	pub fn shutdown(&self) {
		for handle in &self.handles {
			handle.abort();
		}
	}

	pub fn file_cut(&self, from: Url, mut to: Url, force: bool) {
		let mut ongoing = self.ongoing.lock();
		let id = ongoing.add(TaskKind::User, format!("Cut {from} to {to}"));

		if to.starts_with(&from) && to != from {
			self.new_and_fail(id, "Cannot cut directory into itself").ok();
			return;
		}

		ongoing.hooks.insert(id, {
			let ongoing = self.ongoing.clone();
			let (from, to) = (from.clone(), to.clone());

			Box::new(move |canceled: bool| {
				async move {
					if !canceled {
						remove_dir_clean(&from).await;
						Pump::push_move(from, to);
					}
					ongoing.lock().try_remove(id, TaskStage::Hooked);
				}
				.boxed()
			})
		});

		let file = self.file.clone();
		self.send_micro(id, LOW, async move {
			if !force {
				to = unique_name(to, must_be_dir(&from)).await?;
			}
			file.paste(FileOpPaste { id, from, to, cha: None, cut: true, follow: false, retry: 0 }).await
		});
	}

	pub fn file_copy(&self, from: Url, mut to: Url, force: bool, follow: bool) {
		let id = self.ongoing.lock().add(TaskKind::User, format!("Copy {from} to {to}"));

		if to.starts_with(&from) && to != from {
			self.new_and_fail(id, "Cannot copy directory into itself").ok();
			return;
		}

		let file = self.file.clone();
		self.send_micro(id, LOW, async move {
			if !force {
				to = unique_name(to, must_be_dir(&from)).await?;
			}
			file.paste(FileOpPaste { id, from, to, cha: None, cut: false, follow, retry: 0 }).await
		});
	}

	pub fn file_link(&self, from: Url, mut to: Url, relative: bool, force: bool) {
		let id = self.ongoing.lock().add(TaskKind::User, format!("Link {from} to {to}"));

		let file = self.file.clone();
		self.send_micro(id, LOW, async move {
			if !force {
				to = unique_name(to, must_be_dir(&from)).await?;
			}
			file
				.link(FileOpLink { id, from, to, cha: None, resolve: false, relative, delete: false })
				.await
		});
	}

	pub fn file_hardlink(&self, from: Url, mut to: Url, force: bool, follow: bool) {
		let id = self.ongoing.lock().add(TaskKind::User, format!("Hardlink {from} to {to}"));

		if to.starts_with(&from) && to != from {
			self.new_and_fail(id, "Cannot hardlink directory into itself").ok();
			return;
		}

		let file = self.file.clone();
		self.send_micro(id, LOW, async move {
			if !force {
				to = unique_name(to, must_be_dir(&from)).await?;
			}
			file.hardlink(FileOpHardlink { id, from, to, cha: None, follow }).await
		});
	}

	pub fn file_delete(&self, target: Url) {
		let mut ongoing = self.ongoing.lock();
		let id = ongoing.add(TaskKind::User, format!("Delete {target}"));

		ongoing.hooks.insert(id, {
			let target = target.clone();
			let ongoing = self.ongoing.clone();

			Box::new(move |canceled: bool| {
				async move {
					if !canceled {
						fs::remove_dir_all(&target).await.ok();
						ManagerProxy::update_tasks(&target);
						Pump::push_delete(target);
					}
					ongoing.lock().try_remove(id, TaskStage::Hooked);
				}
				.boxed()
			})
		});

		let file = self.file.clone();
		self.send_micro(
			id,
			LOW,
			async move { file.delete(FileOpDelete { id, target, length: 0 }).await },
		);
	}

	pub fn file_trash(&self, target: Url) {
		let mut ongoing = self.ongoing.lock();
		let id = ongoing.add(TaskKind::User, format!("Trash {target}"));

		ongoing.hooks.insert(id, {
			let target = target.clone();
			let ongoing = self.ongoing.clone();

			Box::new(move |canceled: bool| {
				async move {
					if !canceled {
						ManagerProxy::update_tasks(&target);
						Pump::push_trash(target);
					}
					ongoing.lock().try_remove(id, TaskStage::Hooked);
				}
				.boxed()
			})
		});

		let file = self.file.clone();
		self.send_micro(id, LOW, async move {
			file.trash(FileOpTrash { id, target: target.clone(), length: 0 }).await
		})
	}

	pub fn plugin_micro(&self, opt: PluginOpt) {
		let id = self.ongoing.lock().add(TaskKind::User, format!("Run micro plugin `{}`", opt.id));

		let plugin = self.plugin.clone();
		self.send_micro(id, NORMAL, async move { plugin.micro(PluginOpEntry { id, opt }).await });
	}

	pub fn plugin_macro(&self, opt: PluginOpt) {
		let id = self.ongoing.lock().add(TaskKind::User, format!("Run macro plugin `{}`", opt.id));

		self.plugin.macro_(PluginOpEntry { id, opt }).ok();
	}

	pub fn fetch_paged(&self, fetcher: &'static Fetcher, targets: Vec<yazi_fs::File>) {
		let id = self.ongoing.lock().add(
			TaskKind::Preload,
			format!("Run fetcher `{}` with {} target(s)", fetcher.run.name, targets.len()),
		);

		let prework = self.prework.clone();
		self.send_micro(id, NORMAL, async move {
			prework.fetch(PreworkOpFetch { id, plugin: fetcher, targets }).await
		});
	}

	pub fn preload_paged(&self, preloader: &'static Preloader, target: &yazi_fs::File) {
		let id =
			self.ongoing.lock().add(TaskKind::Preload, format!("Run preloader `{}`", preloader.run.name));

		let target = target.clone();
		let prework = self.prework.clone();
		self.send_micro(id, NORMAL, async move {
			prework.load(PreworkOpLoad { id, plugin: preloader, target }).await
		});
	}

	pub fn prework_size(&self, targets: Vec<&Url>) {
		let throttle = Arc::new(Throttle::new(targets.len(), Duration::from_millis(300)));
		let mut ongoing = self.ongoing.lock();

		for target in targets {
			let id = ongoing.add(TaskKind::Preload, format!("Calculate the size of {target}"));
			let target = target.clone();
			let throttle = throttle.clone();

			let prework = self.prework.clone();
			self.send_micro(id, NORMAL, async move {
				prework.size(PreworkOpSize { id, target, throttle }).await
			});
		}
	}

	pub fn process_open(&self, ProcessExecOpt { cwd, opener, args, done }: ProcessExecOpt) {
		let name = {
			let args = args.iter().map(|a| a.to_string_lossy()).collect::<Vec<_>>().join(" ");
			if args.is_empty() {
				format!("Run {:?}", opener.run)
			} else {
				format!("Run {:?} with `{args}`", opener.run)
			}
		};

		let (cancel_tx, cancel_rx) = mpsc::channel(1);
		let mut ongoing = self.ongoing.lock();

		let id = ongoing.add(TaskKind::User, name);
		ongoing.hooks.insert(id, {
			let ongoing = self.ongoing.clone();
			Box::new(move |canceled: bool| {
				async move {
					if canceled {
						cancel_tx.send(()).await.ok();
						cancel_tx.closed().await;
					}
					if let Some(tx) = done {
						tx.send(()).ok();
					}
					ongoing.lock().try_remove(id, TaskStage::Hooked);
				}
				.boxed()
			})
		});

		let cmd = OsString::from(&opener.run);
		let process = self.process.clone();
		self.send_micro(id, NORMAL, async move {
			if opener.block {
				process.block(ProcessOpBlock { id, cwd, cmd, args }).await
			} else if opener.orphan {
				process.orphan(ProcessOpOrphan { id, cwd, cmd, args }).await
			} else {
				process.bg(ProcessOpBg { id, cwd, cmd, args, cancel: cancel_rx }).await
			}
		});
	}

	fn schedule_micro(
		&self,
		rx: async_priority_channel::Receiver<BoxFuture<'static, ()>, u8>,
	) -> JoinHandle<()> {
		tokio::spawn(async move {
			loop {
				if let Ok((fut, _)) = rx.recv().await {
					fut.await;
				}
			}
		})
	}

	fn schedule_macro(
		&self,
		micro: async_priority_channel::Receiver<BoxFuture<'static, ()>, u8>,
		macro_: async_priority_channel::Receiver<TaskOp, u8>,
	) -> JoinHandle<()> {
		let file = self.file.clone();
		let plugin = self.plugin.clone();
		let prework = self.prework.clone();

		let prog = self.prog.clone();
		let ongoing = self.ongoing.clone();

		tokio::spawn(async move {
			loop {
				select! {
					Ok((fut, _)) = micro.recv() => {
						fut.await;
					}
					Ok((op, _)) = macro_.recv() => {
						let id = op.id();
						if !ongoing.lock().exists(id) {
							continue;
						}

						let result = match op {
							TaskOp::File(op) => file.work(*op).await,
							TaskOp::Plugin(op) => plugin.work(*op).await,
							TaskOp::Prework(op) => prework.work(*op).await,
						};

						if let Err(e) = result {
							prog.send(TaskProg::Fail(id, format!("Failed to work on this task: {e:?}"))).ok();
						}
					}
				}
			}
		})
	}

	fn progress(&self, mut rx: UnboundedReceiver<TaskProg>) -> JoinHandle<()> {
		let micro = self.micro.clone();
		let ongoing = self.ongoing.clone();

		tokio::spawn(async move {
			while let Some(op) = rx.recv().await {
				match op {
					TaskProg::New(id, size) => {
						if let Some(task) = ongoing.lock().get_mut(id) {
							task.total += 1;
							task.found += size;
						}
					}
					TaskProg::Adv(id, succ, processed) => {
						let mut ongoing = ongoing.lock();
						if let Some(task) = ongoing.get_mut(id) {
							task.succ += succ;
							task.processed += processed;
						}
						if succ > 0 {
							if let Some(fut) = ongoing.try_remove(id, TaskStage::Pending) {
								micro.try_send(fut, LOW).ok();
							}
						}
					}
					TaskProg::Succ(id) => {
						if let Some(fut) = ongoing.lock().try_remove(id, TaskStage::Dispatched) {
							micro.try_send(fut, LOW).ok();
						}
					}
					TaskProg::Fail(id, reason) => {
						if let Some(task) = ongoing.lock().get_mut(id) {
							task.fail += 1;
							task.logs.push_str(&reason);
							task.logs.push('\n');

							if let Some(logger) = &task.logger {
								logger.send(reason).ok();
							}
						}
					}
					TaskProg::Log(id, line) => {
						if let Some(task) = ongoing.lock().get_mut(id) {
							task.logs.push_str(&line);
							task.logs.push('\n');

							if let Some(logger) = &task.logger {
								logger.send(line).ok();
							}
						}
					}
				}
			}
		})
	}

	fn send_micro<F>(&self, id: usize, priority: u8, f: F)
	where
		F: Future<Output = Result<()>> + Send + 'static,
	{
		let prog = self.prog.clone();
		_ = self.micro.try_send(
			async move {
				if let Err(e) = f.await {
					prog.send(TaskProg::New(id, 0)).ok();
					prog.send(TaskProg::Fail(id, format!("Task initialization failed:\n{e:?}"))).ok();
				}
			}
			.boxed(),
			priority,
		);
	}

	fn new_and_fail(&self, id: usize, reason: &str) -> Result<()> {
		self.prog.send(TaskProg::New(id, 0))?;
		self.prog.send(TaskProg::Fail(id, reason.to_owned()))?;
		Ok(())
	}
}
